package com.z.sink

import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import java.sql.PreparedStatement

/**
 * @Author wenz.ma
 * @Date 2021/10/29 11:06
 * @Desc socket 输入数据，经过flink加工，分发到两个mysql表
 */
object SocketSinkMysql {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputStream = env.socketTextStream("server120", 9999)
    /**
     * 测试表
     * create table stu(sid int,name varchar(25))charset=utf8;
     * create table project(pid int,sid int,pro varchar(25),score int)charset=utf8;
     * 测试数据
     * stu,1001,张三
     * stu,1002,李四
     * project,10001,1001,语文,90
     * project,10002,1002,语文,99
     * project,10003,1001,数学,79
     * project,10004,1002,数学,120
     */
    val resultStream = inputStream
      .filter(x => {
        x != null && !"".equals(x)
      })//filter 算子，过滤为空数据
      .map(v => {
        v
      })//map算子，未进行任何处理
      .process(new ProcessFunction[String, String]() {//process算子，定义输入输出类型为String
        override def processElement(v: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
          val vs: Array[String] = v.split(",")//根据逗号分割
          vs(0).toString().toLowerCase match {
            case "stu" => {//第一个值匹配stu，走这个逻辑
              ctx.output(new OutputTag[String]("stu"), v)
            }
            case "project" => {//第一个值匹配project，走这个逻辑
              ctx.output(new OutputTag[String]("project"), v)
            }
          }
        }
      })
    //定义侧输出流
    val stu = new OutputTag[String]("stu")
    val project = new OutputTag[String]("project")
    //获取侧输出流
    val stuStream = resultStream.getSideOutput(stu)
    val projectStream = resultStream.getSideOutput(project)
    //定义Jdbc sink
    stuStream.addSink(JdbcSink.sink("insert into stu (sid,name) values(?,?)",
      new JdbcStatementBuilder[String]() {
        override def accept(t: PreparedStatement, u: String): Unit = {
          t.setInt(1, Integer.parseInt(u.split(",")(1)))
          t.setString(2, u.split(",")(2))
        }
      },
      JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
        .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("flink_test")
        .withPassword("flink_test")
        .build()))
    //定义Jdbc sink
    projectStream.addSink(JdbcSink.sink("insert into project (pid,sid,pro,score) values(?,?,?,?)",
      new JdbcStatementBuilder[String]() {
        override def accept(t: PreparedStatement, u: String): Unit = {
          t.setInt(1, Integer.parseInt(u.split(",")(1)))
          t.setInt(2, Integer.parseInt(u.split(",")(2)))
          t.setString(3, u.split(",")(3).toString)
          t.setInt(4, Integer.parseInt(u.split(",")(4)))
        }
      },
      JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
        .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("flink_test")
        .withPassword("flink_test")
        .build()))
    env.execute()
  }
}
